{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "182ce5dd",
   "metadata": {},
   "source": [
    "# Loading CSV data in SecretFlow"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8d6c44d1",
   "metadata": {},
   "source": [
    "The following codes are demos only. It's **NOT for production** due to system security concerns, please **DO NOT** use it directly in production."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "01776225",
   "metadata": {},
   "source": [
    "This tutorial will demonstrate, through several examples, how to load CSV data in SecretFlow and utilize it for data processing and modeling."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e7323a0a",
   "metadata": {},
   "source": [
    "## Setting up the environment"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "c2842348",
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext autoreload\n",
    "%autoreload 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "68274f5f",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "2023-04-13 13:45:04,665\tINFO worker.py:1538 -- Started a local Ray instance.\n"
     ]
    }
   ],
   "source": [
    "import secretflow as sf\n",
    "\n",
    "# Check the version of your SecretFlow\n",
    "print('The version of SecretFlow: {}'.format(sf.__version__))\n",
    "\n",
    "# In case you have a running secretflow runtime already.\n",
    "sf.shutdown()\n",
    "sf.init(['alice', 'bob', 'charlie'], address=\"local\", log_to_driver=True)\n",
    "alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "8af24c46",
   "metadata": {},
   "source": [
    "## Introduction to the interface"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "ffd77211",
   "metadata": {},
   "source": [
    "In SecretFlow, we provide an interface similar to `pandas.read_csv` to read CSV data from different parties and unify it into a federated concept of data.\n",
    "\n",
    "- For horizontal scenarios, there is `secretflow.horizontal.read_csv` [API](https://www.secretflow.org.cn/docs/secretflow/en/source/secretflow.data.horizontal.html#secretflow.data.horizontal.read_csv) available.\n",
    "- For horizontal scenarios, there is `secretflow.vertical.read_csv` [API](https://www.secretflow.org.cn/docs/secretflow/en/source/secretflow.data.horizontal.html#secretflow.data.vertical.read_csv) available. \n",
    "\n",
    "By using `read_csv`, you can read CSV files from multiple parties and create a FedDataFrame."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "531f5302",
   "metadata": {},
   "source": [
    "**Build Federated Table**  \n",
    "A federated table is a virtual concept that spans multiple parties.\n",
    "\n",
    "1. Data from each party in the federated table is stored locally and is not allowed to leave the domain.\n",
    "2. Except for the party that owns the data, no one else can access the data storage.\n",
    "3. Any operation on the federated table is scheduled by the Driver to each Worker, and the execution instructions are passed layer by layer until they reach the Python Runtime of the specific Worker that owns the data. The framework ensures that data can only be operated on when the Worker.device and Object.device are the same.\n",
    "4. The federated table is designed to manage and manipulate multi-party data from a central perspective.\n",
    "5. The interface aligns with `pandas.DataFrame` to reduce the cost of multi-party data operations.\n",
    "\n",
    "<img alt=\"vdataframe.png\" src=\"resources/vdataframe.png\" width=\"600\">  "
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "b6b2af40",
   "metadata": {},
   "source": [
    "## Data Download and Splitting"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "ed65fa48",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "E0413 13:45:12.618292213  130484 fork_posix.cc:76]           Other threads are currently calling into gRPC, skipping fork() handlers\n"
     ]
    }
   ],
   "source": [
    "%%capture\n",
    "%%!\n",
    "wget https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/iris/iris.csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "bdf1afd6",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "\n",
    "alldata_df = pd.read_csv(\"./iris.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "88df4de9",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "150"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "len(alldata_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "9ff06f1b",
   "metadata": {},
   "outputs": [],
   "source": [
    "h_alice_df = alldata_df.loc[:70]\n",
    "h_bob_df = alldata_df.loc[70:]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "6bd4a699",
   "metadata": {},
   "source": [
    "Save horizontally split DataFrame as CSV files separately."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "66d07e39",
   "metadata": {},
   "outputs": [],
   "source": [
    "# save the data to local file system\n",
    "import tempfile\n",
    "\n",
    "_, h_alice_path = tempfile.mkstemp()\n",
    "_, h_bob_path = tempfile.mkstemp()\n",
    "h_alice_df.to_csv(h_alice_path, index=False)\n",
    "h_bob_df.to_csv(h_bob_path, index=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "609b6f57",
   "metadata": {},
   "outputs": [],
   "source": [
    "v_alice_df = alldata_df.loc[:, ['sepal_length', 'sepal_width']]\n",
    "v_bob_df = alldata_df.loc[:, ['petal_length', 'petal_width', 'class']]"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "43f0863d",
   "metadata": {},
   "source": [
    "Save vertically split DataFrame as CSV files separately."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "c51826c9",
   "metadata": {},
   "outputs": [],
   "source": [
    "# save the data to local file system\n",
    "_, v_alice_path = tempfile.mkstemp()\n",
    "_, v_bob_path = tempfile.mkstemp()\n",
    "v_alice_df.to_csv(v_alice_path, index=True, index_label=\"id\")\n",
    "v_bob_df.to_csv(v_bob_path, index=True, index_label=\"id\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "24b6916a",
   "metadata": {},
   "source": [
    "## Loading CSV Data Example for Horizontal Scenario"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "713e09fe",
   "metadata": {},
   "outputs": [],
   "source": [
    "from secretflow.data.horizontal import read_csv\n",
    "from secretflow.security.aggregation.plain_aggregator import PlainAggregator\n",
    "from secretflow.security.compare.plain_comparator import PlainComparator\n",
    "from secretflow.data.split import train_test_split"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "da57e9e3",
   "metadata": {},
   "source": [
    "First, prepare the CSV data files for two parties. In a horizontal scenario, it is required that the schema of the data from both parties is consistent.\n",
    "\n",
    "- Alice: datapath (the local path accessible on Alice's machine)\n",
    "- Bob: datapath (the local path accessible on Bob's machine)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "7a0d4ae1",
   "metadata": {},
   "source": [
    "In a horizontal scenario, since the data with the same schema is distributed across multiple parties, cross-domain computation is required for certain DataFrame operations.\n",
    "The `read_csv` interface requires the specification of an `aggregator` and a `comparator`. We can specify a `secure aggregator` and a `secure comparator` during computation to protect data privacy."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "9c4ffb66",
   "metadata": {},
   "outputs": [],
   "source": [
    "path_dict = {alice: h_alice_path, bob: h_bob_path}\n",
    "\n",
    "aggregator = PlainAggregator(charlie)\n",
    "comparator = PlainComparator(charlie)\n",
    "\n",
    "hdf = read_csv(filepath=path_dict, aggregator=aggregator, comparator=comparator)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "d36b91b5",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Index(['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class'], dtype='object')"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "hdf.columns"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "e0c6a4ae",
   "metadata": {},
   "source": [
    "The resulting hdf is a FedDataFrame, and we can now perform some data processing on it.\n",
    "For reference, [Data Preprocessing with FedDataFrame](https://www.secretflow.org.cn/docs/secretflow/en/tutorial/data_preprocessing_with_data_frame.html)："
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "8bec80d9",
   "metadata": {},
   "outputs": [],
   "source": [
    "label = hdf[\"class\"]\n",
    "data = hdf.drop(columns=\"class\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "3f89bc5c",
   "metadata": {},
   "source": [
    "The obtained `data` and `label` can be inputted into FLModel or SLModel for modeling purposes."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "dfc3eb3f",
   "metadata": {},
   "source": [
    "SecretFlow provides `train_test_split` to split data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "9e065f08",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data, test_data = train_test_split(\n",
    "    data, train_size=0.8, shuffle=True, random_state=1234\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "16dadfc1",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{alice: (56, 4), bob: (64, 4)} {alice: (15, 4), bob: (16, 4)}\n"
     ]
    }
   ],
   "source": [
    "print(train_data.partition_shape(), test_data.partition_shape())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "849e2e9d",
   "metadata": {},
   "source": [
    "## Loading CSV Data Example for Vertical Scenario"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "52ea476a",
   "metadata": {},
   "source": [
    "First, prepare the CSV data files for two parties. In a vertical scenario, it is not required that the schema of the data from both parties is consistent.\n",
    "We provide the ability of PSI in the `read_csv` interface.\n",
    "\n",
    "- Alice: datapath (The local path accessible on Alice's machine)\n",
    "- Bob: datapath (The local path accessible on Bob's machine)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "75308a31",
   "metadata": {},
   "source": [
    "In a vertical scenario, the schema of the data from both parties is not consistent, but each party has all the data of each column. No comparator or aggregator is required. However, the data of each party is not necessarily aligned, and we need to align the data through `PSI` when reading.\n",
    "\n",
    "\n",
    "- path_dict: data path\n",
    "- spu: spu device used for intersection\n",
    "- keys: keys for intersection (support multi-column intersection)\n",
    "- drop_keys: ID column name to be deleted after intersection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "c0665453",
   "metadata": {},
   "outputs": [],
   "source": [
    "spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "8a7e0a93",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<secretflow.device.device.spu.SPU at 0x7f3b808347f0>"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spu"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "4b30b5e0",
   "metadata": {},
   "outputs": [],
   "source": [
    "from secretflow.data.vertical import read_csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "aca648d6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001B[2m\u001B[36m(SPURuntime pid=23157)\u001B[0m 2023-04-13 13:45:34.913 [error] [context.cc:operator():132] connect to rank=1 failed with error [external/yacl/yacl/link/transport/channel_brpc.cc:368] send, rpc failed=112, message=[E111]Fail to connect Socket{id=0 addr=127.0.0.1:44893} (0x0x55850693c900): Connection refused [R1][E112]Not connected to 127.0.0.1:44893 yet, server_id=0 [R2][E112]Not connected to 127.0.0.1:44893 yet, server_id=0 [R3][E112]Not connected to 127.0.0.1:44893 yet, server_id=0\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:36.308 [error] [context.cc:operator():132] connect to rank=0 failed with error [external/yacl/yacl/link/transport/channel_brpc.cc:368] send, rpc failed=112, message=[E111]Fail to connect Socket{id=0 addr=127.0.0.1:24875} (0x0x562301a903c0): Connection refused [R1][E112]Not connected to 127.0.0.1:24875 yet, server_id=0 [R2][E112]Not connected to 127.0.0.1:24875 yet, server_id=0 [R3][E112]Not connected to 127.0.0.1:24875 yet, server_id=0\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Init:228] bucket size set to 1048576\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Run:97] Begin sanity check for input file: /tmp/tmp0y82frfo, precheck_switch:true\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.313 [info] [csv_checker.cc:CsvChecker:121] Executing duplicated scripts: LC_ALL=C sort --buffer-size=1G --temporary-directory=/tmp --stable selected-keys.1681364737312870835 | LC_ALL=C uniq -d > duplicate-keys.1681364737312870835\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:115] End sanity check for input file: /tmp/tmp0y82frfo, size=150\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:133] Skip doing psi, because dataset has been aligned!\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:178] Begin post filtering, indices.size=150, should_sort=true\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.317 [info] [utils.cc:MultiKeySort:88] Executing sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316884224 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316884224\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.320 [info] [utils.cc:MultiKeySort:90] Finished sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316884224 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316884224, ret=0\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23518)\u001B[0m 2023-04-13 13:45:37.320 [info] [bucket_psi.cc:Run:216] End post filtering, in=/tmp/tmp0y82frfo, out=/tmp/tmp0y82frfo.psi_output_94874\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Init:228] bucket size set to 1048576\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.312 [info] [bucket_psi.cc:Run:97] Begin sanity check for input file: /tmp/tmp5xjv6qs8, precheck_switch:true\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.313 [info] [csv_checker.cc:CsvChecker:121] Executing duplicated scripts: LC_ALL=C sort --buffer-size=1G --temporary-directory=/tmp --stable selected-keys.1681364737313158626 | LC_ALL=C uniq -d > duplicate-keys.1681364737313158626\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:115] End sanity check for input file: /tmp/tmp5xjv6qs8, size=150\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:133] Skip doing psi, because dataset has been aligned!\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.316 [info] [bucket_psi.cc:Run:178] Begin post filtering, indices.size=150, should_sort=true\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.316 [info] [utils.cc:MultiKeySort:88] Executing sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316796390 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316796390\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.319 [info] [utils.cc:MultiKeySort:90] Finished sort scripts: tail -n +2 /tmp/tmp-sort-in-1681364737316796390 | LC_ALL=C sort --buffer-size=3G --parallel=8 --temporary-directory=./ --stable --field-separator=, --key=1,1 >>/tmp/tmp-sort-out-1681364737316796390, ret=0\n",
      "\u001B[2m\u001B[36m(SPURuntime pid=23517)\u001B[0m 2023-04-13 13:45:37.319 [info] [bucket_psi.cc:Run:216] End post filtering, in=/tmp/tmp5xjv6qs8, out=/tmp/tmp5xjv6qs8.psi_output_94874\n"
     ]
    }
   ],
   "source": [
    "path_dict = {\n",
    "    alice: v_alice_path,  # The path that alice can access\n",
    "    bob: v_bob_path,  # The path that bob can access\n",
    "}\n",
    "\n",
    "# Prepare the SPU device\n",
    "spu = sf.SPU(sf.utils.testing.cluster_def(['alice', 'bob']))\n",
    "\n",
    "vdf = read_csv(path_dict, spu=spu, keys='id', drop_keys=\"id\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "9d01c4d7",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Index(['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class'], dtype='object')"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "vdf.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "7640532b",
   "metadata": {},
   "outputs": [],
   "source": [
    "label = vdf[\"class\"]\n",
    "data = vdf.drop(columns=\"class\")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "cc9d4ee4",
   "metadata": {},
   "source": [
    "We can also use `train_test_split` to split data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "69a79fbb",
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data, test_data = train_test_split(\n",
    "    data, train_size=0.8, shuffle=True, random_state=1234\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "id": "6640aa76",
   "metadata": {},
   "source": [
    "## Next, you can try your own csv data"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
